package com.base;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import com.utils.KafkaUtil;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.rocksdb.Env;

import java.sql.SQLException;

/**
 * @Description:
 * @Author: Sky
 * @Times : 2021/8/11 19:28
 */


public class MQBaseETL implements BaseETL<String> {
    @Override
    public DataStream<String> KafkaConsummer(String topic, StreamExecutionEnvironment env) throws Exception {

//    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//
//    env.setParallelism(1);

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), KafkaUtil.kafkaProps);
        kafkaConsumer.setStartFromEarliest();
        DataStream<String> datsStremingSource = env.addSource(kafkaConsumer);

        return datsStremingSource;

    }


}
